package com.simafei.flow.web.util;

import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;

/**
 * @author fengpengju
 */
@Component
@Slf4j
public class SseHelper {

    public static final String CLOSE_TAG = "[DONE]";

    private final Map<String, CopyOnWriteArrayList<SseEmitter>> emitterMap;

    public SseHelper() {
        this.emitterMap = new ConcurrentHashMap<>();
    }

    public void addEmitter(String execId, SseEmitter emitter) {
        final CopyOnWriteArrayList<SseEmitter> emitters = emitterMap.computeIfAbsent(execId, k -> new CopyOnWriteArrayList<>());

        emitter.onCompletion(() -> emitters.remove(emitter));

        emitter.onError((e) ->{
            log.warn("SseEmitter onError", e);
            emitter.completeWithError(e);
            emitters.remove(emitter);
        });

        emitter.onTimeout(() -> {
            log.warn("SseEmitter onTimeout");
            emitter.complete();
            emitters.remove(emitter);
        });

        emitters.add(emitter);
    }

    @Async
    public void send(String execId, Object data) {
        final CopyOnWriteArrayList<SseEmitter> emitters = emitterMap.get(execId);
        if (emitters != null) {
            emitters.forEach(emitter -> {
                try {
                    emitter.send(data, MediaType.APPLICATION_JSON);
                    if (data == CLOSE_TAG) {
                        emitter.complete();
                    }
                } catch (Exception e) {
                    emitter.completeWithError(e);
                }
            });
        }
    }

    @Async
    public void complete(String execId) {
        send(execId, CLOSE_TAG);
    }
}
